<?php
namespace hs\rabbitmq;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Log;
use util\Util;

/**
 * 生产者
 * Class Producer
 * @package hs\rabbitmq
 */
class Producer extends Common
{


    protected $queueConfig = [];
    /**
     *
     * 生产者生产消息
     * @param array $param
     * @param string $key
     * @return bool
     * @throws \Exception
     */
    public function publish(array $param, string $key): bool
    {
        try {

//            $param = $this->createTask($param);
//            if(empty($param['solevar'])){
//                throw new \Exception("任务创建失败");
//            }

            //建立通道
            $channel = $this->connection->channel();

            //初始化交换机
            $channel->exchange_declare($this->topic['exchange_name'], $this->topic['exchange_type'], false, true, false);

            //生成消息
            $msg = new AMQPMessage(json_encode($param), [
                'content-type'  => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]);

            //推送消息到某个交换机
            $channel->basic_publish($msg, $this->topic['exchange_name'],  $key . '.queue');
            $channel->close();
            $this->connection->close();
            return true;
        }catch (\Exception $exception){
            Log::error('Producer.Exception:'.$exception->getMessage());
            return false;
        }

    }

    /** 生产延时消息  */
    public function publishDelayed(array $param, string $key): bool
    {

        try{

            $this->queueConfig = $this->queue[$key];
            $param = $this->createTask(array_merge($param,$this->queueConfig));
            if(empty($param['solevar'])){
                throw new \Exception("任务创建失败");
            }

            //建立通道
            $channel = $this->connection->channel();
            // 设置类型
            $args = new AMQPTable([
                'x-delayed-type' => 'direct'
            ]);
            // 声明交换机时指定交换机类型
            // x-delayed-message 类型由插件生成
            $channel->exchange_declare("delayed_{$this->topic['exchange_name']}","x-delayed-message", false, true, false);
            //$channel->queue_declare($this->queueConfig['queue_name'],false, true,false,false,false,$args);
            //$channel->queue_bind($this->queueConfig['queue_name'], "delayed_{$this->topic['exchange_name']}", $this->queueConfig['route_key']);
            $this->setNextExecTime();
            $msg = new AMQPMessage(json_encode($param), [
                'content-type'         => 'application/json',
                'delivery_mode'        => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers'  => new AMQPTable([
                    // 设置延迟时间（单位：毫秒）
                    'x-delay' => $this->queue_exec_delay
                ])
            ]);
            $channel->basic_publish($msg, "delayed_{$this->topic['exchange_name']}",  $this->queueConfig['route_key']);
            $channel->close();
            $this->connection->close();
            return true;
        }catch (\Exception $exception){
            Log::error('publishDelayed.Exception:'.$exception->getMessage());
            return false;
        }
    }


    public function createTask(array $param)
    {
        if(!isset($param['solevar'])){
            $param['solevar'] = $this->insertTask($param);
        }else{
            $this->setQueueMsg($param)->getTask();
        }
        return $param;
    }

}
